BigQuery に日時データをロードすると UTC に変換されてしまう問題に「データ統合基盤 CS アナリティクス」で対応する
こんにちは、みかみです。
弊社クラスメソッドの自社プロダクト CS アナリティクス(以下 CSA )は、短期間、低コストで導入可能な統合データ分析基盤です。
概要
BigQuery のデフォルトタイムゾーンは UTC なので、タイムゾーン情報がない日時データを TIMESTAMP
型でロードすると、BigQuery に格納されたデータのタイムゾーンは UTC になってしまいます。
そのため、日時データを JST で BigQuery にロードするには、ロード前のデータにタイムゾーン情報を付与するか、テーブルカラムのデータ型を TIMESTAMP
ではなく DATETIME
型にしてロードする必要があります。
テーブルデータ型 | ロード前データ | BigQuery 格納データ | ロード前後の データ整合性 |
---|---|---|---|
TIMESTAMP | 2020-10-01 09:00:00 | 2020-10-01 09:00:00 UTC | NG |
TIMESTAMP | 2020-10-01T09:00:00+09:00 | 2020-10-01 00:00:00 UTC | OK |
DATETIME | 2020-10-01 09:00:00 | 2020-10-01T09:00:00 | OK |
DATETIME | 2020-10-01T09:00:00+09:00 | なし(ロードエラー) | NG |
CSA では以下の構成要素を画面操作で登録し、登録した構成要素を組み合わせてジョブを作成して実行することが可能です。
- データ連携:ファイルストレージから DWH へのデータロード
- SQL:SQL の実行
- プログラム:Python プログラムの実行
今回は CSA のジョブを作成して、前述の 2 つの方法で日時データを BigQuery にロードしてみます。
CSA JMCの挙動確認バージョン
当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。
- CSA JMC v5.0.0
前提
CSA の 構成要素設定、BigQuery接続設定手順は割愛します。詳細は以下のエントリをご参照ください。
プログラム、SQL、データ連携構成要素の CSA への登録手順およびジョブ作成・実行手順も省きますので、詳細は過去エントリをご参照ください。
- データ統合基盤 CS アナリティクスで BigQuery にデータを差分ロードしてみた | Developers.IO
- データ統合基盤 CS アナリティクスで Google Cloud クライアントライブラリを使用した Python プログラムを実行してみる | Developers.IO
また、CSA からはクライアントライブラリを使用して Google API 経由で BigQuery および GCS にアクセスするため、GCP の管理コンソールから BigQuery API と Cloud Storage API を有効に設定する必要があります。
日時データを BigQuery にロードした結果を確認
初めに、タイムゾーンが付与されていない日時データをそのまま BigQuery にロードした場合の結果を確認してみます。
確認用に以下の CSV データを準備しました。
ロードと同時に新規でテーブル作成をするように指定して、BigQuery にデータロードした結果は以下です。
BigQuery ではデータロード時にファイルフォーマットに合わせて自動でデータ型を判断し、新規テーブルを作成することが可能です。日時データの場合、テーブルのデータ型は TIMESTAMP
型で自動検出してくれました。
TIMESTAMP
型はタイムゾーン付きのデータ型なので、ロード前のデータにタイムゾーン情報がない場合は、BigQuery のデフォルトタイムゾーンである UTC が付与されてしまいます。
なお、CLI などから SQL を実行してデータを参照する場合には、タイムゾーンは明示されません。
gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \ > 'SELECT col_timestamp FROM csa_mikami.load_timestamp LIMIT 3' Waiting on bqjob_r7bcf5afd59db54bf_00000174f891d316_1 ... (0s) Current status: DONE +---------------------+ | col_timestamp | +---------------------+ | 2020-06-28 15:18:31 | | 1973-01-12 05:32:31 | | 1996-10-24 16:06:47 | +---------------------+
試しに使ってみる、またはアドホック分析など限定的な用途にしか使用しない場合、BigQuery 格納データのタイムゾーンと実際のデータのタイムゾーンが異なっていても気にする必要はないケースもあると思いますが、やはり本番運用で保守性などを考慮すると BigQuery にも連携データと同じタイムゾーンで日時データを格納しておきたいものです。。
ロードデータにタイムゾーンを付与してから BigQuery にロード
日時データに JST のタイムゾーンを付与する Python プログラムを実行し、フォーマット変換したデータファイルを BigQuery にロードします。
Python プログラムの実行とデータロード処理は、CSA から画面操作で登録し、ジョブを作成して実行します。
GCS に配置してあるデータファイルを取得し、日時項目に JST のタイムゾーンを付与する以下の Python プログラムを CSA に登録します。
from google.cloud import storage from io import BytesIO import pandas as pd from datetime import datetime, timedelta, timezone import os import csa_env def main(): # get parameters vars = csa_env.get().get('vars') bucket_name = vars.get('BUCKET') path_src = vars.get('PATH_SRC') path_dst = vars.get('PATH_DST') # get blob objects client = storage.Client() bucket = client.get_bucket(bucket_name) blobs = client.list_blobs(bucket_name, prefix=path_src) for blob in blobs: if blob.name == path_src: continue print(blob.name) blob = bucket.blob(blob.name) content = blob.download_as_string() df = pd.read_csv(BytesIO(content)) print('before: {}'.format(df['col_timestamp'][0])) # formate timestamp jst = timezone(timedelta(hours=+9), 'JST') df['col_timestamp'] = df['col_timestamp'].map(lambda x: datetime.strptime(x, '%Y/%m/%d %H:%M:%S').replace(tzinfo=jst).isoformat()) print('after : {}'.format(df['col_timestamp'][0])) # upload formatted file file_name = os.path.basename(blob.name) blob = bucket.blob('{}{}'.format(path_dst, file_name)) blob.upload_from_string(data=df.to_csv(index=False))
GCS バケット名、ファイル取得元のパス、フォーマット変換後のファイル配置パスは、後ほど CSA のジョブ作成画面から実行引数で指定します。
続いて、フォーマット変換後のデータファイルを BigQuery にロードする「データ連携」構成要素を登録しました。
最後に、登録した構成要素を Python プログラム→データ連携の順に実行するジョブを作成し、実行します。
CSA の実行ログ画面から、ジョブが正常に完了したことが確認できました。
GCP 管理コンソールからも、フォーマット変換後のファイルデータと BigQuery にロードされたデータを確認してみます。
BigQuery の格納データは UTC になるので一見わかりにくいですが、付与したタイムゾーン( JST )に合わせて、ロード前のデータの日時から -09:00
の UTC で BigQuery に格納されたことが確認できました。
SQL で参照する場合には、FORMAT_TIMESTAMP
関数で JST に変換する必要があるのでご注意ください。
gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \ > 'SELECT FORMAT_TIMESTAMP("%Y-%m-%d %H:%M:%S", col_timestamp, "Asia/Tokyo") AS col_timestamp FROM csa_mikami.load_timestamp LIMIT 3' Waiting on bqjob_r2a939e99c3db9982_00000174f8bc1c31_1 ... (0s) Current status: DONE +---------------------+ | col_timestamp | +---------------------+ | 2020-06-28 15:18:31 | | 1973-01-12 05:32:31 | | 1996-10-24 16:06:47 | +---------------------+
TIMESTAMP 型ではなく DATETIME 型で BigQuery にロード
他に、複数のタイムゾーンにまたがるデータをロードする可能性がない場合には、日時データを TIMESTAMP
型ではなく DATETIME
型で BigQuery にロードする方法もあります。
DATETIME
型であればタイムゾーン情報が付与されないため、ロード前後でデータの整合がずれることはありません。
しかし、ロードと同時にファイルフォーマットを判定して自動的にテーブルを新規作成すると日時データのデータ型は TIMESTAMP
型になってしまうので、日時項目のカラムを明示的に DATETIME
型で定義したテーブルを作成してからファイルデーをロードする必要があります。
また、DATETIME
型にロードする場合、データの日時フォーマットがスラッシュ区切りだとエラーになってしまいます。
日時項目のカラムを DATETIME
型で定義した CREATE TABLE
の SQL と、日時データを YYYY-MM-DD hh:mm:ss
フォーマットに変換してからロードする CSA のジョブを作成して実行してみます。
日時項目を DATETIME
型で定義したテーブルを作成する、以下の SQL を CSA に登録しました。
データセット名、テーブル名は実行引数で指定するため、プレースホルダーで記載しています。
CREATE TABLE IF NOT EXISTS {{ vars.DATASET }}.{{ vars.TABLE }} ( col_date DATE, col_timestamp DATETIME )
また、日時データフォーマット変換と BigQuery へのロードを実行する以下の Python プログラムを CSA に登録しました。 バケット名、テーブル名などのリテラルは、SQL 同様ジョブの実行引数で指定します。
from google.cloud import storage from google.cloud import bigquery from io import BytesIO, StringIO import pandas as pd from datetime import datetime as dt import csa_env def main(): # get parameters vars = csa_env.get().get('vars') bucket_name = vars.get('BUCKET') path_src = vars.get('PATH_SRC') dataset_id = vars.get('DATASET') table_id = vars.get('TABLE') # get blob objects client = storage.Client() bucket = client.get_bucket(bucket_name) blobs = client.list_blobs(bucket_name, prefix=path_src) for blob in blobs: if blob.name == path_src: continue print(blob.name) blob = bucket.blob(blob.name) content = blob.download_as_string() df = pd.read_csv(BytesIO(content)) # formate timestamp df['col_date'] = df['col_date'].map(lambda x: dt.strptime(x, '%Y/%m/%d').strftime('%Y-%m-%d')) df['col_timestamp'] = df['col_timestamp'].map(lambda x: dt.strptime(x, '%Y/%m/%d %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S')) print('formatted:\n{}'.format(df)) # load data to bigquery client_bigquery = bigquery.Client() job_config = bigquery.LoadJobConfig(write_disposition='WRITE_APPEND') load_job = client_bigquery.load_table_from_file( StringIO(df.to_csv(header=False, index=False)), '{}.{}'.format(dataset_id, table_id), job_config=job_config, ) print("Starting job {}".format(load_job.job_id)) load_job.result() print("Job finished.")
SQL と Python プログラムを実行するジョブを作成して実行します。
CSA の実行ログから、ジョブが正常に実行されたことを確認できました。
BigQuery 管理コンソールからも、ロード済みのデータを確認してみます。
DATATIME
型のテーブルが作成され、TIMESTAMP
型とは異なり、ロードデータにはタイムゾーンが付与されません。
CLI から SQL を実行してデータを参照してみます。
gcp_da_user@cloudshell:~ (csa-dev-v5)$ bq query --nouse_legacy_sql \ > 'SELECT col_timestamp FROM csa_mikami.load_datetime LIMIT 3' Waiting on bqjob_r344c44fecbf00e45_00000175019695ef_1 ... (0s) Current status: DONE +---------------------+ | col_timestamp | +---------------------+ | 2020-06-28T15:18:31 | | 1973-01-12T05:32:31 | | 1996-10-24T16:06:47 | +---------------------+
DATATIME
型にはタイムゾーン情報が付与されないため FORMAT_TIMESTAMP
関数を使う必要はなく、ロード前のデータの日時がそのままロードされていることが確認できました。
まとめ
CSA では SQL や Python プログラムファイルを登録して、画面操作で簡単にジョブを作成・実行することが可能です。 実行引数が指定できるので、プレースホルダーを使用した汎用的な SQL やプログラムを登録しておけば、データの種類が増えた場合にジョブを追加するのも簡単です。 ジョブのスケジュール実行や実行通知設定も可能なので、毎時、日次、月次などの定時バッチの運用にも便利です。
BigQuery にも対応可能な統合データ分析基盤 CSA について、少しでもご興味をお持ちいただけましたら、ぜひお気軽に弊社クラスメソッドにご連絡ください!